package com.iteaj.iot.server.handle;

import com.iteaj.iot.CoreConst;
import com.iteaj.iot.DeviceManager;
import com.iteaj.iot.IotServeBootstrap;
import com.iteaj.iot.Message;
import com.iteaj.iot.codec.filter.CombinedFilter;
import com.iteaj.iot.event.ClientStatus;
import com.iteaj.iot.event.ClientStatusEvent;
import com.iteaj.iot.message.UnParseBodyMessage;
import com.iteaj.iot.server.ServerComponentFactory;
import com.iteaj.iot.server.SocketServerComponent;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.Attribute;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Optional;

/**
 * <p>事件管理处理器</p>
 * 用来管理平台上的各类事件
 * @author iteaj
 * @since 1.8
 */
@ChannelHandler.Sharable
public class EventManagerHandler extends SimpleChannelInboundHandler<UnParseBodyMessage> {

    private static EventManagerHandler managerHandler;
    private ServerComponentFactory componentFactory;
    private Logger logger = LoggerFactory.getLogger(getClass());

    public static EventManagerHandler getInstance(ServerComponentFactory componentFactory) {
        if(managerHandler == null) {
            managerHandler = new EventManagerHandler(componentFactory);
        }
        return managerHandler;
    }

    protected EventManagerHandler(ServerComponentFactory componentFactory) {
        this.componentFactory = componentFactory;
    }

    /**
     * 处理心跳事件
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){ //心跳事件
            IdleState state = ((IdleStateEvent) evt).state();
            switch (state){
                case READER_IDLE: timeoutHandle(ctx, "读超时");
                    break;
                case WRITER_IDLE: timeoutHandle(ctx, "写超时");
                    break;
                case ALL_IDLE: timeoutHandle(ctx, "读写超时");
                    break;
            }

        }
    }

    /**
     * 读写超时处理
     * @param ctx
     */
    protected void timeoutHandle(final ChannelHandlerContext ctx, String desc){
        ctx.channel().close().addListener((ChannelFutureListener) future -> {
            String equipCode = (String) ctx.channel().attr(CoreConst.EQUIP_CODE).get();
            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
            final String target = address.getHostString()+":"+address.getPort();

            if(future.isSuccess()){
                InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress();
                SocketServerComponent component = componentFactory.getByPort(localAddress.getPort());

                //链接关闭的时候移除对应的设备
                if(StringUtils.isNotBlank(equipCode)) {
                    DeviceManager deviceManager = component.getDeviceManager();
                    deviceManager.remove(equipCode);
                }

                if(logger.isDebugEnabled()) {
                    logger.debug("客户端超时下线({}) {} - 客户端编号: {} - 客户端地址: {} - 状态: 下线成功"
                            , component.getName(), desc, equipCode, target);
                }
            } else if(logger.isWarnEnabled()) {
                logger.warn("客户端超时下线({}) {} - 客户端编号: {} - 客户端地址: {} - 状态: 下线失败", desc, equipCode, target);
            }
        });

        return;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, UnParseBodyMessage msg) throws Exception {
        if(msg.getHead() == null || msg.getHead().getEquipCode() == null) {
            logger.error("事件管理器错误, 没办法注册设备到设备管理器, 请检查报文是否包含报文头及报文头包含设备编号 - 报文: {}", msg);
            return;
        }

        SocketServerComponent component = componentFactory.getByClass(msg.getClass());
        CombinedFilter filter = (CombinedFilter) component.getFilter().orElse(CombinedFilter.DEFAULT);

        //获取设备编号
        String equipCode = (String) ctx.channel().attr(CoreConst.EQUIP_CODE).get();
        if(null == equipCode) {
            Message.MessageHead messageHead = filter.register(msg.getHead(), null, ctx.channel(), component);
            equipCode = messageHead.getEquipCode();

            // 注册设备编号到对应的Channel
            ctx.channel().attr(CoreConst.EQUIP_CODE).setIfAbsent(equipCode);
        } else {
            filter.register(msg.getHead(), equipCode, ctx.channel(), component);
        }

        DeviceManager deviceManager = component.getDeviceManager();
        Channel channel = deviceManager.find(equipCode);
        //设备还没有注册到设备管理器,则注册
        if(null == channel){
            deviceManager.add(equipCode, ctx.pipeline().channel());

            //触发设备上线事件
            IotServeBootstrap.publishApplicationEvent(new ClientStatusEvent(equipCode
                    , ClientStatus.online, component));

            if(logger.isDebugEnabled()) {
                logger.debug("客户端上线({}) - 客户端编号: {} - 服务: {}"
                        , component.getName(), equipCode, component.getDesc());
            }
        } else { //设备已经存在,判断是否是同一台设备

            final String deviceSn = equipCode;
            //不是同一台设备则关闭早期一台
            if(channel != ctx.channel()){
                channel.close().addListener((ChannelFutureListener) future -> {
                    String status = future.isSuccess() ? "成功" : "失败";
                    if(logger.isWarnEnabled()) {
                        logger.warn("客户端冲突({}) - 客户端编号: {} - 处理: 移除早期的一台 - 关闭状态：{}"
                                , component.getName(), deviceSn, status);
                    }
                });

                // 先移除早期的一台
                deviceManager.remove(equipCode);
                // 新增最新的设备
                deviceManager.add(equipCode, ctx.pipeline().channel());

                //触发设备上线事件
                IotServeBootstrap.publishApplicationEvent(new ClientStatusEvent(equipCode, ClientStatus.online, component));
            }
        }

        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
        SocketServerComponent component = componentFactory.getByPort(localAddress.getPort());
        Optional<CombinedFilter> componentFilter = (Optional<CombinedFilter>) component.getFilter();
        if(!componentFilter.orElse(CombinedFilter.DEFAULT).isActivation(channel, component)) {
            channel.close();
        } else {
            DeviceManager deviceManager = component.getDeviceManager();
            deviceManager.add(channel);
            int activeCount = deviceManager.size();

            if(logger.isTraceEnabled()) {
                InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
                logger.trace("客户端激活 客户端地址：{} - 当前连接数： {}", address.getHostName()+":"+address.getPort(), activeCount);
            }
            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {

            Attribute attribute = ctx.channel().attr(CoreConst.EQUIP_CODE);
            InetSocketAddress address = (InetSocketAddress)ctx.channel().localAddress();
            SocketServerComponent serverComponent = componentFactory.getByPort(address.getPort());

            Object equipCode = attribute.get();
            if(equipCode != null) {
                //触发设备下线事件
                InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                IotServeBootstrap.publishApplicationEvent(new ClientStatusEvent(equipCode, ClientStatus.offline, serverComponent));

                logger.warn("客户端断线({}) 客户端编号: {} - 客户端地址: {}", serverComponent.getName()
                        , equipCode, remoteAddress.getHostString()+":"+remoteAddress.getPort());

                return;
            } else if(logger.isWarnEnabled()) {
                InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                if(remoteAddress != null && serverComponent != null) {
                    // 一般没有设备编号也不会保存到设备管理器
                    logger.warn("客户端断线({}) 客户端异常 - 客户端地址: {}", serverComponent.getName()
                            , remoteAddress.getHostString()+":"+remoteAddress.getPort());
                }

            }

        } finally {
            super.channelInactive(ctx);
        }

    }
}
